关于kafka的offset存储
消费者通过offset控制消费的进度,这里有几个概念先解释一下.
- Offset: 每个ConsumerGroup中针对一个topic的每个Partition的消费进度.通过这个来控制消费进度.
- LogSize: Kafka的数据位置,随着新的数据到来而增加.
- Lag: LogSize - Offset . 指落后的大小.
因此正常Consumer的不堆积是Lag的值处于比较小的范围,比如 0~1000.
然而,存在的一些问题:
- 那随着数据量的增加,offset和logSize的值一直增加,到超过int的范围吗,还是有清零的规则.(应该是有相应的机制,这个不重要了)
- 有关offset的一些注意点如下
存储位置
从kafka-0.9版本及以后,消费者组和offset信息就不存在zk中了,而是存到broker服务器上.存放在一个叫__consumer_offsets的topic中.
关于offset的消费者参数
auto.offset.reset
1 | earliest |
也就是说,这个参数的指定只有在新的consumer group添加的时候,或者其他原因导致分区上的offset没有了的情况,才更有意义.
那随之又有的问题:
- 如果为了能消费新的数据,而对于老的customer-group,不想消费堆积的数据. 或者说想废弃掉这个group了,那不用之后会有什么影响
- 另外,对于无止尽的customer-group创建,对kafka集群有什么影响吗,当然不仅仅是新group替代旧的group.而是还有在用group的增多,会对集群有什么影响?
下面详细总结下
1. 废弃group的增多
个人理解: group增多,增加了对group的管理成本,那对于不用的group,存放在broker中,不会对其它造成影响.
目前只是猜测,具体再详细研究.
2. 在用group的增多
对于老版本(zk管理customer信息和offset), 会增加customer与zk的交互成本.
新版本(大于0.9), customer信息和offset由broker管理,只是增加了customer与broker的交互, 然而这一部分交互信息对于整个数据流来说微乎其微,所以影响应该不大.
需要在研究下offset更新的流程(customer与broker)
再聊聊kafka的group coordinator
Coordinator一般指的是运行在broker上的group Coordinator,用于管理Consumer Group中各个成员,每个KafkaServer都有一个GroupCoordinator实例,管理多个消费者组,主要用于offset位移管理和Consumer Rebalance。
- 在 Server 端增加了 GroupCoordinator 这个角色
- 将 topic 的 offset 信息由之前存储在 zookeeper(/consumers/
/offsets/ / ,zk写操作性能不高) 上改为存储到一个特殊的 topic 中(__consumer_offsets)
1. rebalance时机
- 有新的consumer加入
- 旧的consumer挂了
- coordinator挂了,集群选举出新的coordinator
- topic的partition新加
- consumer调用unsubscrible(),取消topic的订阅
2. __consumer_offsets
Consumer通过发送OffsetCommitRequest请求到指定broker(偏移量管理者)提交偏移量。
这个请求中包含一系列分区以及在这些分区中的消费位置(偏移量)
偏移量管理者会追加键值(key-value)形式的消息到一个指定的topic(__consumer_offsets)。key是由consumerGroup-topic-partition组成的,而value是偏移量。
感觉其实用HashMap应该更好一些,因为通过key来获取或管理offset(偏移量-value)
因为这种存储方式(队列), find的时间复杂度为O(n), 需要遍历整个__consumer_offsets,扫描全部偏移量topic日志.
因此集群的内存中也是维护了一份最近的记录,为了能在指定key的情况下能够快速的给出OffsetFetchRequests而不用扫描全部偏移量topic日志.
如果偏移量管理者因某种原因失败,新的broker将会成为偏移量管理者并且通过扫描偏移量topic来重新生成偏移量缓存。
ps: 内存中应该是Map结构,那内存中的记录与偏移量topic(__consumer_offsets)的数据怎么保证一致性的呢??
3. Consumer与Consumer Group
consumer group是kafka提供的可扩展且具有容错性的消费者机制。组内可以有多个消费者或消费者实例(consumer instance),它们共享一个公共的ID,即group ID。组内的所有消费者协调在一起来消费订阅主题(subscribed topics)的所有分区(partition)。
consumer instance可以是一个进程,也可以是一个线程.